From 9618232f4da325692dcf98fd6ff5b8abd9fce66c Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 4 Oct 2012 20:00:00 -0400 Subject: [PATCH] pull: Stage content asynchronously For similar reasons as metadata, this avoids having the main thread blocked in fdatasync(), and even better - we can achieve much higher parallelism if we have multiple threads blocked on fdatasync(). --- src/libostree/ostree-repo.c | 95 ++++++++++++++++++++++++++++ src/libostree/ostree-repo.h | 15 ++++- src/ostree/ostree-fetcher.c | 9 +++ src/ostree/ostree-fetcher.h | 2 + src/ostree/ostree-pull.c | 121 +++++++++++++----------------------- 5 files changed, 164 insertions(+), 78 deletions(-) diff --git a/src/libostree/ostree-repo.c b/src/libostree/ostree-repo.c index 88408876..ac3a3dc5 100644 --- a/src/libostree/ostree-repo.c +++ b/src/libostree/ostree-repo.c @@ -1558,6 +1558,101 @@ ostree_repo_stage_content (OstreeRepo *self, cancellable, error); } +typedef struct { + OstreeRepo *repo; + char *expected_checksum; + GInputStream *object; + guint64 file_object_length; + GCancellable *cancellable; + GSimpleAsyncResult *result; + + guchar *result_csum; +} StageContentAsyncData; + +static void +stage_content_async_data_free (gpointer user_data) +{ + StageContentAsyncData *data = user_data; + + g_clear_object (&data->repo); + g_clear_object (&data->cancellable); + g_clear_object (&data->object); + g_free (data->result_csum); + g_free (data->expected_checksum); + g_free (data); +} + +static void +stage_content_thread (GSimpleAsyncResult *res, + GObject *object, + GCancellable *cancellable) +{ + GError *error = NULL; + StageContentAsyncData *data; + + data = g_simple_async_result_get_op_res_gpointer (res); + if (!ostree_repo_stage_content (data->repo, data->expected_checksum, + data->object, data->file_object_length, + &data->result_csum, + cancellable, &error)) + g_simple_async_result_take_error (res, error); +} + +/** + * ostree_repo_stage_content_async: + * + * Asynchronously store the content object @object. If provided, + * the checksum @expected_checksum will be verified. + */ +void +ostree_repo_stage_content_async (OstreeRepo *self, + const char *expected_checksum, + GInputStream *object, + guint64 file_object_length, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + StageContentAsyncData *asyncdata; + + asyncdata = g_new0 (StageContentAsyncData, 1); + asyncdata->repo = g_object_ref (self); + asyncdata->expected_checksum = g_strdup (expected_checksum); + asyncdata->object = g_object_ref (object); + asyncdata->file_object_length = file_object_length; + asyncdata->cancellable = cancellable ? g_object_ref (cancellable) : NULL; + + asyncdata->result = g_simple_async_result_new ((GObject*) self, + callback, user_data, + ostree_repo_stage_content_async); + + g_simple_async_result_set_op_res_gpointer (asyncdata->result, asyncdata, + stage_content_async_data_free); + g_simple_async_result_run_in_thread (asyncdata->result, stage_content_thread, G_PRIORITY_DEFAULT, cancellable); + g_object_unref (asyncdata->result); +} + +gboolean +ostree_repo_stage_content_finish (OstreeRepo *self, + GAsyncResult *result, + guchar **out_csum, + GError **error) +{ + GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); + StageContentAsyncData *data; + + g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == ostree_repo_stage_content_async); + + if (g_simple_async_result_propagate_error (simple, error)) + return FALSE; + + data = g_simple_async_result_get_op_res_gpointer (simple); + /* Transfer ownership */ + *out_csum = data->result_csum; + data->result_csum = NULL; + return TRUE; +} + static GVariant * create_empty_gvariant_dict (void) { diff --git a/src/libostree/ostree-repo.h b/src/libostree/ostree-repo.h index 62f51d36..2f7ac7a8 100644 --- a/src/libostree/ostree-repo.h +++ b/src/libostree/ostree-repo.h @@ -113,7 +113,7 @@ void ostree_repo_stage_metadata_async (OstreeRepo *self, gboolean ostree_repo_stage_metadata_finish (OstreeRepo *self, GAsyncResult *result, - guchar **out_checksum, + guchar **out_csum, GError **error); gboolean ostree_repo_stage_content (OstreeRepo *self, @@ -138,6 +138,19 @@ gboolean ostree_repo_stage_content_trusted (OstreeRepo *self, GCancellable *cancellable, GError **error); +void ostree_repo_stage_content_async (OstreeRepo *self, + const char *expected_checksum, + GInputStream *object, + guint64 file_object_length, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + +gboolean ostree_repo_stage_content_finish (OstreeRepo *self, + GAsyncResult *result, + guchar **out_csum, + GError **error); + gboolean ostree_repo_resolve_rev (OstreeRepo *self, const char *rev, gboolean allow_noent, diff --git a/src/ostree/ostree-fetcher.c b/src/ostree/ostree-fetcher.c index bf6421a5..6d4b5dbb 100644 --- a/src/ostree/ostree-fetcher.c +++ b/src/ostree/ostree-fetcher.c @@ -82,6 +82,7 @@ struct OstreeFetcher GHashTable *message_to_request; /* SoupMessage -> SoupRequest */ guint64 total_downloaded; + guint total_requests; }; G_DEFINE_TYPE (OstreeFetcher, ostree_fetcher, G_TYPE_OBJECT) @@ -243,6 +244,8 @@ ostree_fetcher_request_uri_async (OstreeFetcher *self, OstreeFetcherPendingURI *pending; GError *local_error = NULL; + self->total_requests++; + pending = g_new0 (OstreeFetcherPendingURI, 1); pending->refcount = 1; pending->self = g_object_ref (self); @@ -352,3 +355,9 @@ ostree_fetcher_bytes_transferred (OstreeFetcher *self) { return self->total_downloaded; } + +guint +ostree_fetcher_get_n_requests (OstreeFetcher *self) +{ + return self->total_requests; +} diff --git a/src/ostree/ostree-fetcher.h b/src/ostree/ostree-fetcher.h index 803db729..4f6897e3 100644 --- a/src/ostree/ostree-fetcher.h +++ b/src/ostree/ostree-fetcher.h @@ -51,6 +51,8 @@ char * ostree_fetcher_query_state_text (OstreeFetcher *self); guint64 ostree_fetcher_bytes_transferred (OstreeFetcher *self); +guint ostree_fetcher_get_n_requests (OstreeFetcher *self); + void ostree_fetcher_request_uri_async (OstreeFetcher *self, SoupURI *uri, GCancellable *cancellable, diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c index b189c0b2..ccbb0032 100644 --- a/src/ostree/ostree-pull.c +++ b/src/ostree/ostree-pull.c @@ -109,7 +109,9 @@ typedef struct { guint n_fetched_content; guint outstanding_filemeta_requests; guint outstanding_filecontent_requests; - guint outstanding_checksum_requests; + guint outstanding_content_stage_requests; + + guint64 previous_total_downloaded; GError **async_error; gboolean caught_error; @@ -185,18 +187,31 @@ uri_fetch_update_status (gpointer user_data) OtPullData *pull_data = user_data; ot_lfree char *fetcher_status; GString *status; + guint64 current_bytes_transferred; + guint64 delta_bytes_transferred; status = g_string_new (""); - g_string_append_printf (status, "%u/%u metadata %u/%u content fetched; ", + if (pull_data->metadata_scan_active) + g_string_append_printf (status, "scan: %u metadata; ", + g_atomic_int_get (&pull_data->n_scanned_metadata)); + + g_string_append_printf (status, "fetch: %u/%u metadata %u/%u content; ", g_atomic_int_get (&pull_data->n_fetched_metadata), g_atomic_int_get (&pull_data->n_requested_metadata), pull_data->n_fetched_content, g_atomic_int_get (&pull_data->n_requested_content)); - if (pull_data->outstanding_checksum_requests > 0) - g_string_append_printf (status, "Calculating %u checksums; ", - pull_data->outstanding_checksum_requests); + current_bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher); + delta_bytes_transferred = current_bytes_transferred - pull_data->previous_total_downloaded; + pull_data->previous_total_downloaded = current_bytes_transferred; + + if (delta_bytes_transferred < 1024) + g_string_append_printf (status, "%u B/s; ", + (guint)delta_bytes_transferred); + else + g_string_append_printf (status, "%.1f KiB/s; ", + (double)delta_bytes_transferred / 1024); fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher); g_string_append (status, fetcher_status); @@ -245,7 +260,7 @@ check_outstanding_requests_handle_error (OtPullData *pull_data, pull_data->outstanding_uri_requests == 0 && pull_data->outstanding_filemeta_requests == 0 && pull_data->outstanding_filecontent_requests == 0 && - pull_data->outstanding_checksum_requests == 0) + pull_data->outstanding_content_stage_requests == 0) g_main_loop_quit (pull_data->loop); throw_async_error (pull_data, error); } @@ -507,79 +522,27 @@ destroy_fetch_one_content_item_data (OtFetchOneContentItemData *data) } static void -content_fetch_on_checksum_complete (GObject *object, - GAsyncResult *result, - gpointer user_data) +content_fetch_on_stage_complete (GObject *object, + GAsyncResult *result, + gpointer user_data) { OtFetchOneContentItemData *data = user_data; GError *local_error = NULL; GError **error = &local_error; - guint64 length; - GCancellable *cancellable = NULL; - gboolean compressed; - ot_lfree guchar *csum; - ot_lvariant GVariant *file_meta = NULL; - ot_lobj GFileInfo *file_info = NULL; - ot_lvariant GVariant *xattrs = NULL; - ot_lobj GInputStream *content_input = NULL; - ot_lobj GInputStream *file_object_input = NULL; - ot_lfree char *checksum; + ot_lfree guchar *csum = NULL; + ot_lfree char *checksum = NULL; - csum = ot_gio_checksum_stream_finish ((GInputStream*)object, result, error); - if (!csum) + if (!ostree_repo_stage_content_finish ((OstreeRepo*)object, result, + &csum, error)) goto out; checksum = ostree_checksum_from_bytes (csum); - if (strcmp (checksum, data->checksum) != 0) - { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Corrupted object %s (actual checksum is %s)", - data->checksum, checksum); - goto out; - } - - compressed = data->pull_data->remote_mode == OSTREE_REPO_MODE_ARCHIVE_Z; - - if (compressed) - { - content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error); - if (!content_input) - goto out; - if (!ostree_zlib_content_stream_open (content_input, &length, &file_object_input, - cancellable, error)) - goto out; - } - else - { - if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, TRUE, - &file_meta, error)) - goto out; - - if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error)) - goto out; - - if (data->content_path) - { - content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error); - if (!content_input) - goto out; - } - - if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs, - &file_object_input, &length, - cancellable, error)) - goto out; - } - - if (!ostree_repo_stage_content_trusted (data->pull_data->repo, checksum, - file_object_input, length, - cancellable, error)) - goto out; + g_assert (strcmp (checksum, data->checksum) == 0); data->pull_data->n_fetched_content++; out: - data->pull_data->outstanding_checksum_requests--; + data->pull_data->outstanding_content_stage_requests--; check_outstanding_requests_handle_error (data->pull_data, local_error); destroy_fetch_one_content_item_data (data); } @@ -623,6 +586,7 @@ content_fetch_on_complete (GObject *object, GCancellable *cancellable = NULL; gboolean was_content_fetch = FALSE; gboolean need_content_fetch = FALSE; + guint64 length; ot_lvariant GVariant *file_meta = NULL; ot_lobj GFileInfo *file_info = NULL; ot_lobj GInputStream *content_input = NULL; @@ -674,20 +638,21 @@ content_fetch_on_complete (GObject *object, if (!need_content_fetch && compressed) { ot_lobj GInputStream *uncomp_input = NULL; - guint64 uncompressed_len; g_assert (data->content_path != NULL); content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error); if (!content_input) goto out; - if (!ostree_zlib_content_stream_open (content_input, &uncompressed_len, &uncomp_input, + if (!ostree_zlib_content_stream_open (content_input, &length, &uncomp_input, cancellable, error)) goto out; - - data->pull_data->outstanding_checksum_requests++; - ot_gio_checksum_stream_async (uncomp_input, G_PRIORITY_DEFAULT, NULL, - content_fetch_on_checksum_complete, data); + + data->pull_data->outstanding_content_stage_requests++; + ostree_repo_stage_content_async (data->pull_data->repo, data->checksum, + uncomp_input, length, + cancellable, + content_fetch_on_stage_complete, data); } else if (!need_content_fetch) { @@ -709,13 +674,15 @@ content_fetch_on_complete (GObject *object, } if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs, - &file_object_input, NULL, + &file_object_input, &length, cancellable, error)) goto out; - data->pull_data->outstanding_checksum_requests++; - ot_gio_checksum_stream_async (file_object_input, G_PRIORITY_DEFAULT, NULL, - content_fetch_on_checksum_complete, data); + data->pull_data->outstanding_content_stage_requests++; + ostree_repo_stage_content_async (data->pull_data->repo, data->checksum, + file_object_input, length, + cancellable, + content_fetch_on_stage_complete, data); } while (data->pull_data->outstanding_filemeta_requests < 10) -- 2.30.2